package com.ttx.rabbitmq.demo.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
 * @author
 * @date
 */
@Configuration
public class RabbitMqConfig2 {

    //---------------------------- confirm
    public static final String CONFIRM_QUEUE_NAME="confirm.queue";


    @Bean
    public Queue confirmQueue(){
        return new Queue(CONFIRM_QUEUE_NAME);
    }



    //---------------------------- batch 批量
    // 官方文档 https://docs.spring.io/spring-amqp/reference/html/#receiving-batch
    @Bean("batchQueueRabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置批量
        factory.setBatchListener(true);
        factory.setConsumerBatchEnabled(true);
        factory.setBatchSize(10);
        return factory;
    }


    @Bean("batchQueueTaskScheduler")
    public TaskScheduler batchQueueTaskScheduler(){
        TaskScheduler taskScheduler=new ThreadPoolTaskScheduler();
        return taskScheduler;
    }


    //批量处理rabbitTemplate
    @Bean("batchQueueRabbitTemplate")
    public BatchingRabbitTemplate batchQueueRabbitTemplate(ConnectionFactory connectionFactory,
                                                           @Qualifier("batchQueueTaskScheduler") TaskScheduler taskScheduler){

        //!!!重点： 所谓批量， 就是spring 将多条message重新组成一条message, 发送到mq, 从mq接受到这条message后，在重新解析成多条message

        //一次批量的数量
        int batchSize=10;
        // 缓存大小限制,单位字节，
        // simpleBatchingStrategy的策略，是判断message数量是否超过batchSize限制或者message的大小是否超过缓存限制，
        // 缓存限制，主要用于限制"组装后的一条消息的大小"
        // 如果主要通过数量来做批量("打包"成一条消息), 缓存设置大点
        // 详细逻辑请看simpleBatchingStrategy#addToBatch()
        int bufferLimit=1024; //1 K
        long timeout=10000;

        //注意，该策略只支持一个exchange/routingKey
        //A simple batching strategy that supports only one exchange/routingKey
        BatchingStrategy batchingStrategy=new SimpleBatchingStrategy(batchSize,bufferLimit,timeout);
        return new BatchingRabbitTemplate(connectionFactory,batchingStrategy,taskScheduler);
    }

    //测试批量
    public static final String BATCH_QUEUE_NAME="batch.queue";

    @Bean
    public Queue batchQueue(){
        return new Queue(BATCH_QUEUE_NAME);
    }



    // ---------------------------------- retry 重试
    // https://docs.spring.io/spring-amqp/reference/html/#template-retry

    public static final String RETRY_QUEUE_NAME="retry.queue";

    @Bean("retryRabbitTemplate")
    public RabbitTemplate retryRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);


        RetryTemplate retryTemplate = new RetryTemplate();
        //每次重试之间的停顿时间策略
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        //初始停顿时间
        backOffPolicy.setInitialInterval(5000);
        //倍数， 每次停顿时间是之前停顿时间的多少倍
        backOffPolicy.setMultiplier(3.0);
        //最大停顿间隔
        backOffPolicy.setMaxInterval(30000);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        SimpleRetryPolicy retryPolicy=new SimpleRetryPolicy();
        //最大重试次数, 注意，这里只会重试(maxAttempt-1)次， 包含第一次执行的话，一共最多会执行maxAttempt次
        retryPolicy.setMaxAttempts(5);
        retryTemplate.setRetryPolicy(retryPolicy);

        template.setRetryTemplate(retryTemplate);
        return template;
    }


}
